package com.atguigu.app

import com.alibaba.fastjson.JSON
import com.atguigu.bean.UserInfo
import com.atguigu.constants.GmallConstants
import com.atguigu.util.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import redis.clients.jedis.Jedis

object UserApp {
  def main(args: Array[String]): Unit = {
    //1.创建Sparkconf
    val sparkConf: SparkConf = new SparkConf().setAppName("UserApp").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    //3.消费kafka数据
    val KafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_USER, ssc)

    //4.将数据转为样例类
    val userInfoDStream: DStream[UserInfo] = KafkaDStream.mapPartitions(partition => {
      partition.map(record => {
        val userInfo: UserInfo = JSON.parseObject(record.value(), classOf[UserInfo])
        userInfo
      })
    })
    userInfoDStream.print()

    //将用户表的数据缓存到redis中
    KafkaDStream.foreachRDD(rdd=>{
      rdd.foreachPartition(partition=>{
        //创建Redis连接
        val jedis: Jedis = new Jedis("hadoop102",6379)
        partition.foreach(info=>{
          //将读过来的Json字符串转为样例类，目的是为了方便提取到用户ID
          val userInfo: UserInfo = JSON.parseObject(info.value(), classOf[UserInfo])
          //userInfo RedisKey
          val userInfoRedisKey: String = "userInfo:"+userInfo.id
          jedis.set(userInfoRedisKey,info.value())
        })
        jedis.close()
      })
    })

    ssc.start()
    ssc.awaitTermination()

  }

}
